{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Pivoting Data_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 5 Aug 2016, Spark v2.0_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will discuss the differences between pivoting and reshaping, and illustrate what capabilities exist within Spark._\n",
    "\n",
    "_Main operations used: `groupBy`, `pivot`, `sum`_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Reshaping\n",
    "\n",
    "In pySpark pivoting involves an aggregation.  If what you're looking for is reshaping, where a dataset is turned from wide to long or vice versa without the loss of any information, then that is not currently *explicitly* implemented in Spark.  As we will show at the end however, **it is possible to make the `pivot` function work like a reshape from long to wide**, but not in reverse.  It is possible to \"melt\" a dataset from wide to long, but it would require the writing of a loop to do it manually, which we do not demonstrate here.\n",
    "\n",
    "The likely reason for this lack of functionality is that it's an incredibly costly operation; if you've performed it in Stata or SAS, for example, it probably took a while to compute even on a small dataset.  Doing it on very large data that is distributed across many nodes would involve a lot of shuffling and duplicating.  \n",
    "\n",
    "**It is also important to note that many of the results you might be looking for out of a reshape can be accomplished via other, more efficient means,** such as `groupby`, particularly if your goal is various types of summary statistics.  So while we can accomplish a reshape in some cases, if your data is very large it's best to think about ways to avoid having to completely restructure it."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Pivoting\n",
    "\n",
    "To illustrate how pivoting works, we create a simple dataset to experiment with:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "row = Row('state', 'industry', 'hq', 'jobs')\n",
    "\n",
    "df = sc.parallelize([\n",
    "    row('MI', 'auto', 'domestic', 716),\n",
    "    row('MI', 'auto', 'foreign', 123),\n",
    "    row('MI', 'auto', 'domestic', 1340),\n",
    "    row('MI', 'retail', 'foreign', 12),\n",
    "    row('MI', 'retail', 'foreign', 33),\n",
    "    row('OH', 'auto', 'domestic', 349),\n",
    "    row('OH', 'auto', 'foreign', 101),\n",
    "    row('OH', 'auto', 'foreign', 77),\n",
    "    row('OH', 'retail', 'domestic', 45),\n",
    "    row('OH', 'retail', 'foreign', 12)\n",
    "    ]).toDF()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------+--------+----+\n",
      "|state|industry|      hq|jobs|\n",
      "+-----+--------+--------+----+\n",
      "|   MI|    auto|domestic| 716|\n",
      "|   MI|    auto| foreign| 123|\n",
      "|   MI|    auto|domestic|1340|\n",
      "|   MI|  retail| foreign|  12|\n",
      "|   MI|  retail| foreign|  33|\n",
      "|   OH|    auto|domestic| 349|\n",
      "|   OH|    auto| foreign| 101|\n",
      "|   OH|    auto| foreign|  77|\n",
      "|   OH|  retail|domestic|  45|\n",
      "|   OH|  retail| foreign|  12|\n",
      "+-----+--------+--------+----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Pivot operations must always be preceeded by a groupBy operation.  In our first case we will simply pivot to show total domestic versus foreign jobs in each of our two states:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_pivot1 = df.groupby('state').pivot('hq', values=['domestic', 'foreign']).sum('jobs')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------+-------+\n",
      "|state|domestic|foreign|\n",
      "+-----+--------+-------+\n",
      "|   MI|    2056|    168|\n",
      "|   OH|     394|    190|\n",
      "+-----+--------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_pivot1.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Note that the `values=['domestic', 'foreign']` part of the pivot method is optional.**  If we do not supply a list then pySpark will attempt to infer the values by looking through the pivot column we specified, but naturally that requires more processing than if we specify its contents up front.  It's important `values` is specified correctly however; **pySpark will not look to see if you've left any possible values out, and will just discard any observations that don't match what you told it to look for.**  If in doubt, it may be better to accept the inefficiency and let pySpark automatically determine them.\n",
    "\n",
    "As your datasets get larger this sort of optimization becomes more important.  Also note that Spark has a hard limit of 10,000 columns created as a result of a pivot command.\n",
    "\n",
    "Here's another example, this time pivoting by both `state` and by `industry` by simply changing the groupby criteria:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_pivot = df.groupBy('state', 'industry').pivot('hq', values=['domestic', 'foreign']).sum('jobs')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------+--------+-------+\n",
      "|state|industry|domestic|foreign|\n",
      "+-----+--------+--------+-------+\n",
      "|   OH|  retail|      45|     12|\n",
      "|   MI|    auto|    2056|    123|\n",
      "|   OH|    auto|     349|    178|\n",
      "|   MI|  retail|    null|     45|\n",
      "+-----+--------+--------+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_pivot.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `sum` method at the end **can be replaced with other aggregators as necessary**, for example with `avg`.\n",
    "\n",
    "# Using Pivot to Reshape Long to Wide\n",
    "\n",
    "Pivot requires an aggregation argument at the end, as we have been using.  However, what if *each row is uniquely defined* by the `groupby` and `pivot` columns?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "row = Row('state', 'industry', 'hq', 'jobs', 'firm')\n",
    "\n",
    "df = sc.parallelize([\n",
    "    row('MI', 'auto', 'domestic', 716, 'A'),\n",
    "    row('MI', 'auto', 'foreign', 123, 'B'),\n",
    "    row('MI', 'auto', 'domestic', 1340, 'C'),\n",
    "    row('MI', 'retail', 'foreign', 12, 'D'),\n",
    "    row('MI', 'retail', 'foreign', 33, 'E'),\n",
    "    row('OH', 'retail', 'mixed', 978, 'F'),\n",
    "    row('OH', 'auto', 'domestic', 349, 'G'),\n",
    "    row('OH', 'auto', 'foreign', 101, 'H'),\n",
    "    row('OH', 'auto', 'foreign', 77, 'I'),\n",
    "    row('OH', 'retail', 'domestic', 45, 'J'),\n",
    "    row('OH', 'retail', 'foreign', 12, 'K'),\n",
    "    row('OH', 'retail', 'mixed', 1, 'L'),\n",
    "    row('OH', 'auto', 'other', 120, 'M'),\n",
    "    row('OH', 'auto', 'domestic', 96, 'A'),\n",
    "    row('MI', 'auto', 'foreign', 1117, 'A'),\n",
    "    row('MI', 'retail', 'mixed', 9, 'F'),\n",
    "    row('MI', 'auto', 'foreign', 11, 'B')\n",
    "    ]).toDF()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------+--------+----+----+\n",
      "|state|industry|      hq|jobs|firm|\n",
      "+-----+--------+--------+----+----+\n",
      "|   MI|    auto|domestic| 716|   A|\n",
      "|   MI|    auto| foreign| 123|   B|\n",
      "|   MI|    auto|domestic|1340|   C|\n",
      "|   MI|  retail| foreign|  12|   D|\n",
      "|   MI|  retail| foreign|  33|   E|\n",
      "|   OH|  retail|   mixed| 978|   F|\n",
      "|   OH|    auto|domestic| 349|   G|\n",
      "|   OH|    auto| foreign| 101|   H|\n",
      "|   OH|    auto| foreign|  77|   I|\n",
      "|   OH|  retail|domestic|  45|   J|\n",
      "|   OH|  retail| foreign|  12|   K|\n",
      "|   OH|  retail|   mixed|   1|   L|\n",
      "|   OH|    auto|   other| 120|   M|\n",
      "|   OH|    auto|domestic|  96|   A|\n",
      "|   MI|    auto| foreign|1117|   A|\n",
      "|   MI|  retail|   mixed|   9|   F|\n",
      "|   MI|    auto| foreign|  11|   B|\n",
      "+-----+--------+--------+----+----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We've now added a unique identifier for each firm, which we will use instead of state and industry as our groupby criteria.  We also expanded the number of values in the `hq` column:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can now uniquely identify each row observation by the combination of its `firm`, `state` and `industry` entries, then show the different entries for `hq` for that observation as their own columns.  It drops any columns that we don't use anywhere, but if we wanted to keep them we could just include them in the groupby criteria without changing the logic of the operation:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_pivot = df.groupBy('firm', 'state', 'industry').pivot('hq', values=['domestic', 'foreign', 'mixed', 'other']).sum('jobs')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+-----+--------+--------+-------+-----+-----+\n",
      "|firm|state|industry|domestic|foreign|mixed|other|\n",
      "+----+-----+--------+--------+-------+-----+-----+\n",
      "|   D|   MI|  retail|    null|     12| null| null|\n",
      "|   I|   OH|    auto|    null|     77| null| null|\n",
      "|   G|   OH|    auto|     349|   null| null| null|\n",
      "|   J|   OH|  retail|      45|   null| null| null|\n",
      "|   C|   MI|    auto|    1340|   null| null| null|\n",
      "|   A|   MI|    auto|     716|   1117| null| null|\n",
      "|   K|   OH|  retail|    null|     12| null| null|\n",
      "|   B|   MI|    auto|    null|    134| null| null|\n",
      "|   F|   MI|  retail|    null|   null|    9| null|\n",
      "|   E|   MI|  retail|    null|     33| null| null|\n",
      "|   M|   OH|    auto|    null|   null| null|  120|\n",
      "|   H|   OH|    auto|    null|    101| null| null|\n",
      "|   F|   OH|  retail|    null|   null|  978| null|\n",
      "|   L|   OH|  retail|    null|   null|    1| null|\n",
      "|   A|   OH|    auto|      96|   null| null| null|\n",
      "+----+-----+--------+--------+-------+-----+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_pivot.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "All we're doing is telling it to `sum` each grouping of values, but **each grouping only has a single entry.**  Our data is now reshaped from long to wide.  If we replaced the `sum` operator with `mean` or `max` or `min`, it wouldn't change anything.\n",
    "\n",
    "**Note one potential problem here:** performing this operation in software like Stata will raise an error if your specified reshape criteria isn't unique.  When using `pivot` like this in pySpark there wouldn't be an error; it would just perform the aggregation.  This may be problematic if it happens and you were not expecting it to, so it may be necessary to write in some validity tests after the operation completes.  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
